{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Note\n",
    "\n",
    "Please view the [README](https://github.com/eclipse/deeplearning4j/tree/master/dl4j-examples/tutorials/README.md) to learn about installing, setting up dependencies, and importing notebooks in Zeppelin"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Background\n",
    "\n",
    "In this tutorial we will use a neural network to forecast daily sea temperatures. This tutorial will be similar to tutorial 15 Sea Temperature Convolutional LSTM Example. Recall, that the data consists of 2-dimensional temperature grids of 8 seas: Bengal, Korean, Black, Mediterranean, Arabian, Japan, Bohai, and Okhotsk Seas from 1981 to 2017. The raw data was taken from the Earth System Research Laboratory (https://www.esrl.noaa.gov/psd/) and preprocessed into CSV files. Each example consists of fifty 2-dimensional temperature grids, and every grid is represented by a single row in a CSV file. Thus, each sequence is represented by a CSV file with 50 rows.\n",
    "\n",
    "For this task, we will use a convolutional LSTM neural network to forecast 10 days worth of sea temperatures following a given sequence of temperature grids. The network will be trained similarly to the network trained tutorial 15. But the evaluation will be handled differently (applied only to the 10 days following the sequences)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "import org.deeplearning4j.nn.api.OptimizationAlgorithm;\n",
    "import org.deeplearning4j.nn.conf.NeuralNetConfiguration;\n",
    "import org.deeplearning4j.nn.conf.layers.LSTM;\n",
    "import org.deeplearning4j.nn.weights.WeightInit;\n",
    "import org.nd4j.linalg.activations.Activation;\n",
    "import org.nd4j.linalg.api.ndarray.INDArray;\n",
    "import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;\n",
    "import org.nd4j.linalg.dataset.api.iterator.DataSetIterator;\n",
    "import org.deeplearning4j.nn.conf.layers.RnnOutputLayer;\n",
    "import org.deeplearning4j.datasets.datavec.SequenceRecordReaderDataSetIterator;\n",
    "import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction;\n",
    "import org.deeplearning4j.nn.conf.MultiLayerConfiguration;\n",
    "import org.nd4j.linalg.dataset.DataSet;\n",
    "import org.deeplearning4j.nn.conf.preprocessor.RnnToCnnPreProcessor;\n",
    "import org.deeplearning4j.nn.conf.preprocessor.CnnToRnnPreProcessor;\n",
    "import org.deeplearning4j.nn.conf.GradientNormalization;\n",
    "import org.deeplearning4j.nn.conf.layers;\n",
    "import org.deeplearning4j.eval.RegressionEvaluation;\n",
    "import org.deeplearning4j.nn.conf.layers.ConvolutionLayer.Builder;\n",
    "import org.deeplearning4j.nn.conf.layers.ConvolutionLayer;\n",
    "import org.deeplearning4j.nn.conf.Updater;\n",
    "import org.deeplearning4j.nn.conf.layers.SubsamplingLayer;\n",
    "\n",
    "import org.datavec.api.records.reader.impl.csv.CSVSequenceRecordReader;\n",
    "import org.datavec.api.records.reader.SequenceRecordReader;\n",
    "import org.datavec.api.split.NumberedFileInputSplit;\n",
    "\n",
    "import org.nd4j.linalg.indexing.NDArrayIndex;\n",
    "import org.nd4j.linalg.factory.Nd4j;\n",
    "\n",
    "import java.io.File;\n",
    "import java.net.URL;\n",
    "import java.io.BufferedInputStream;\n",
    "import java.io.FileInputStream;\n",
    "import java.io.BufferedOutputStream;\n",
    "import java.io.FileOutputStream;\n",
    "\n",
    "import org.apache.commons.io.FilenameUtils;\n",
    "import org.apache.commons.io.FileUtils;\n",
    "import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;\n",
    "import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;\n",
    "import org.apache.commons.compress.archivers.tar.TarArchiveEntry;\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Download Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To download the data, we will create a temporary directory that will store the data files, extract the tar.gz file from the url, and place it in the specified directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val DATA_URL = \"https://bpstore1.blob.core.windows.net/seatemp/sea_temp2.tar.gz\"\n",
    "val DATA_PATH = FilenameUtils.concat(System.getProperty(\"java.io.tmpdir\"), \"dl4j_seas/\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val directory = new File(DATA_PATH)\n",
    "directory.mkdir() \n",
    "\n",
    "val archizePath = DATA_PATH + \"sea_temp2.tar.gz\"\n",
    "val archiveFile = new File(archizePath)\n",
    "val extractedPath = DATA_PATH + \"sea_temp\" \n",
    "val extractedFile = new File(extractedPath)\n",
    "\n",
    "FileUtils.copyURLToFile(new URL(DATA_URL), archiveFile) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will then extract the data from the tar.gz file, recreate directories within the tar.gz file into our temporary directories, and copy the files from the tar.gz file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "var fileCount = 0\n",
    "var dirCount = 0\n",
    "val BUFFER_SIZE = 4096\n",
    "val tais = new TarArchiveInputStream(new GzipCompressorInputStream( new BufferedInputStream( new FileInputStream(archizePath))))\n",
    "\n",
    "var entry = tais.getNextEntry().asInstanceOf[TarArchiveEntry]\n",
    "\n",
    "while(entry != null){\n",
    "    if (entry.isDirectory()) {\n",
    "        new File(DATA_PATH + entry.getName()).mkdirs()\n",
    "        dirCount = dirCount + 1\n",
    "        fileCount = 0\n",
    "    }\n",
    "    else {\n",
    "        \n",
    "        val data = new Array[scala.Byte](4 * BUFFER_SIZE)\n",
    "\n",
    "        val fos = new FileOutputStream(DATA_PATH + entry.getName());\n",
    "        val dest = new BufferedOutputStream(fos, BUFFER_SIZE);\n",
    "        var count = tais.read(data, 0, BUFFER_SIZE)\n",
    "        \n",
    "        while (count != -1) {\n",
    "            dest.write(data, 0, count)\n",
    "            count = tais.read(data, 0, BUFFER_SIZE)\n",
    "        }\n",
    "        \n",
    "        dest.close()\n",
    "        fileCount = fileCount + 1\n",
    "    }\n",
    "    if(fileCount % 1000 == 0){\n",
    "        print(\".\")\n",
    "    }\n",
    "    \n",
    "    entry = tais.getNextEntry().asInstanceOf[TarArchiveEntry]\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### DataSetIterators"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we will convert the raw data (csv files) into DataSetIterators, which will be fed into a neural network. Our training data will have 1600 examples which will be represented by a single DataSetIterator, and the testing data will have 136 examples which will be represented by a separate DataSetIterator. The temperatures of the 10 days following the sequences in the training and testing data will be contained in a separate DataSetIterator as well."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val path = FilenameUtils.concat(DATA_PATH, \"sea_temp/\") // set parent directory\n",
    "\n",
    "val featureBaseDir = FilenameUtils.concat(path, \"features\") // set feature directory\n",
    "val targetsBaseDir = FilenameUtils.concat(path, \"targets\") // set label directory\n",
    "val futureBaseDir = FilenameUtils.concat(path, \"futures\") // set futures directory"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We first initialize CSVSequenceRecordReaders, which will parse the raw data into record-like format. Then the SequenceRecordReaderDataSetIterators can be created using the RecordReaders. Since each example has exactly 50 timesteps, an alignment mode of equal length is needed. Note also that this is a regression-based task and not a classification one.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val numSkipLines = 1;\n",
    "val regression = true;\n",
    "val batchSize = 32;\n",
    "\n",
    "val trainFeatures = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "trainFeatures.initialize( new NumberedFileInputSplit(featureBaseDir + \"/%d.csv\", 1, 1600));\n",
    "val trainTargets = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "trainTargets.initialize(new NumberedFileInputSplit(targetsBaseDir + \"/%d.csv\", 1, 1600));\n",
    "\n",
    "val train = new SequenceRecordReaderDataSetIterator(trainFeatures, trainTargets, batchSize,\n",
    "                10, regression, SequenceRecordReaderDataSetIterator.AlignmentMode.EQUAL_LENGTH);\n",
    "                \n",
    "                \n",
    "val testFeatures = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "testFeatures.initialize( new NumberedFileInputSplit(featureBaseDir + \"/%d.csv\", 1601, 1736));\n",
    "val testTargets = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "testTargets.initialize(new NumberedFileInputSplit(targetsBaseDir + \"/%d.csv\", 1601, 1736));\n",
    "\n",
    "val test = new SequenceRecordReaderDataSetIterator(testFeatures, testTargets, batchSize,\n",
    "                10, regression, SequenceRecordReaderDataSetIterator.AlignmentMode.EQUAL_LENGTH);\n",
    "                \n",
    "val futureFeatures = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "futureFeatures.initialize( new NumberedFileInputSplit(futureBaseDir + \"/%d.csv\", 1601, 1736));\n",
    "val futureLabels = new CSVSequenceRecordReader(numSkipLines, \",\");\n",
    "futureLabels.initialize(new NumberedFileInputSplit(futureBaseDir + \"/%d.csv\", 1601, 1736));\n",
    "\n",
    "val future = new SequenceRecordReaderDataSetIterator(futureFeatures, futureLabels, batchSize,\n",
    "                10, regression, SequenceRecordReaderDataSetIterator.AlignmentMode.EQUAL_LENGTH);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " \n",
    "\n",
    "### Neural Network"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The next task is to initialize the  parameters for the convolutional LSTM neural network and then set up the neural network configuration.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val V_HEIGHT = 13;\n",
    "val V_WIDTH = 4;\n",
    "val kernelSize = 2;\n",
    "val numChannels = 1;"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the neural network configuraiton we will use the convolutional layer, subsampling layer, LSTM layer, and output layer in success. In order to do this, we need to use the RnnToCnnPreProcessor and CnnToRnnPreprocessor. The RnnToCnnPreProcessor is used to reshape the 3-dimensional input from [batch size, height x width of grid, time series length ] into a 4 dimensional shape [number of examples x time series length , channels, width, height] which is suitable as input to a convolutional layer. The CnnToRnnPreProcessor is then used in a later layer to convert this convolutional shape back to the original 3-dimensional shape."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val conf = new NeuralNetConfiguration.Builder()\n",
    "    .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)\n",
    "    .seed(12345)\n",
    "    .updater(Updater.ADAM)\n",
    "    .weightInit(WeightInit.XAVIER)\n",
    "    .list()\n",
    "    .layer(0, new ConvolutionLayer.Builder(kernelSize, kernelSize)\n",
    "        .nIn(numChannels) //1 channel\n",
    "        .nOut(7)\n",
    "        .stride(2, 2)\n",
    "        .activation(Activation.RELU)\n",
    "        .build())\n",
    "    .layer(1, new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)\n",
    "        .kernelSize(kernelSize, kernelSize)\n",
    "        .stride(2, 2).build())\n",
    "    .layer(2, new LSTM.Builder()\n",
    "        .activation(Activation.SOFTSIGN)\n",
    "        .nIn(21)\n",
    "        .nOut(100)\n",
    "        .gradientNormalization(GradientNormalization.ClipElementWiseAbsoluteValue)\n",
    "        .gradientNormalizationThreshold(10)\n",
    "        .build())\n",
    "    .layer(3, new RnnOutputLayer.Builder(LossFunction.MSE)\n",
    "        .activation(Activation.IDENTITY)\n",
    "        .nIn(100)\n",
    "        .nOut(52)\n",
    "        .gradientNormalization(GradientNormalization.ClipElementWiseAbsoluteValue)\n",
    "        .gradientNormalizationThreshold(10)\n",
    "        .build())\n",
    "    .inputPreProcessor(0, new RnnToCnnPreProcessor(V_HEIGHT, V_WIDTH, numChannels))\n",
    "    .inputPreProcessor(2, new CnnToRnnPreProcessor(3, 1, 7 ))\n",
    "    .pretrain(false).backprop(true)\n",
    "    .build();\n",
    "    \n",
    "val net = new MultiLayerNetwork(conf);\n",
    "net.init();"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Model Training"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To train the model, we use 15 epochs with a for loop and simply call the fit method of the MultiLayerNetwork."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "// Train model on training set\n",
    "\n",
    "for( epoch <- 1 to 15){\n",
    "    println(\"Epoch \"+ epoch);\n",
    "    net.fit( train );\n",
    "    train.reset();\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Model Evaluation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will now evaluate our trained model. Note that we will use RegressionEvaluation, since our task is a regression and not a classification task. We will only evaluate the model using the temperature of the 10 days following the given sequence of daily temperatures and not on the temperatures of the days in the sequence. This will be done using the rnnTimeStep() method of the MultiLayerNetwork.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val eval = new RegressionEvaluation();\n",
    "\n",
    "test.reset();\n",
    "future.reset();\n",
    "\n",
    "while(test.hasNext()) {\n",
    "    val next = test.next();\n",
    "    val features = next.getFeatures();\n",
    "\n",
    "    var pred =  Nd4j.zeros(1, 2);\n",
    "\n",
    "    for(i <- 0 to 49){\n",
    "        pred = net.rnnTimeStep(features.get(NDArrayIndex.all(), NDArrayIndex.all(), NDArrayIndex.interval(i,i+1)));\n",
    "    }\n",
    "\n",
    "    val correct = future.next();\n",
    "    val cFeatures = correct.getFeatures();\n",
    "\n",
    "    for(i <- 0 to 9){\n",
    "        eval.evalTimeSeries(pred, cFeatures.get(NDArrayIndex.all(), NDArrayIndex.all(), NDArrayIndex.interval(i,i+1)));\n",
    "        pred = net.rnnTimeStep(pred);\n",
    "    }\n",
    "    net.rnnClearPreviousState();\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we print out the evaluation statistics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "println(eval.stats())"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark 2.0.0 - Scala 2.11",
   "language": "scala",
   "name": "spark2-scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
